package com.xiang.ad.mysql.listener;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.xiang.ad.mysql.constant.Constant;
import com.xiang.ad.mysql.constant.OpType;
import com.xiang.ad.mysql.dto.BinlogRowData;
import com.xiang.ad.mysql.dto.MySqlRowData;
import com.xiang.ad.mysql.dto.TableTemplate;
import com.xiang.ad.sender.ISender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by xiang.
 * 增量数据的处理器
 * 1、为不同的表 注册不同的监听器，以监听binlog
 * 2、投递以实现增量数据索引的更新
 */
@Slf4j
@Component
public class IncrementListener implements Ilistener {

    //投递接口
    @Resource(name = "kafkaSender") //根据name选择注入
    private ISender sender;

    //监听并解析binlog
    private final AggregationListener aggregationListener;

    //
    @Autowired
    public IncrementListener(AggregationListener aggregationListener) {
        this.aggregationListener = aggregationListener;
    }

    //因为需要对不同的表定义不同的增量数据的更新方法
    //注册不同的监听器
    @Override
    @PostConstruct //注册监听器应该发生在 IncrementListener实例化的时候,如果不这样就会丢失一些binlog
    public void register() {

        log.info("IncrementListener register db and table info");
        //key是表的名字 value是数据库的名字
        Constant.table2Db.forEach((k, v) ->
                //这里就翻过来写
        aggregationListener.register(v, k, this));
    }

    /**
     * 实现增量数据索引的更新
     * 1、将BinlogRowData转换成定义好的MySqlRowData
     * 2、将MySqlRowData投递出去
     */

    @Override
    public void onEvent(BinlogRowData eventData) {

        TableTemplate table = eventData.getTable();
        EventType eventType = eventData.getEventType();

        // 包装成最后需要投递的数据
        MySqlRowData rowData = new MySqlRowData();

        // 填充字段
        rowData.setTableName(table.getTableName());
        rowData.setLevel(eventData.getTable().getLevel());
        OpType opType = OpType.to(eventType);//操作类型的转换
        rowData.setOpType(opType);

        // 取出模板中该操作对应的字段列表
        List<String> fieldList = table.getOpTypeFieldSetMap().get(opType);
        if (null == fieldList) {
            log.warn("{} not support for {}", opType, table.getTableName());
            return;
        }

        // 发生变化的map填充
        for (Map<String, String> afterMap : eventData.getAfter()) {

            //最终需要去转换的mysqlRowData里面也包含了map
            Map<String, String> _afterMap = new HashMap<>();

            //
            for (Map.Entry<String, String> entry : afterMap.entrySet()) {

                String colName = entry.getKey();
                String colValue = entry.getValue();

                _afterMap.put(colName, colValue);
            }

            // 变化的map添加进来
            rowData.getFieldValueMap().add(_afterMap);
        }

        // 投递增量数据
        sender.sender(rowData);
    }
}
